package com.yh.blink.metric

import java.util.Properties

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.Counter
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object MetricTest {

  def main(args: Array[String]): Unit = {
    var sourceTopicName:String = null
    var targetTopicName: String = null
    var env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


    // 开始解析参数
    var params:ParameterTool  = ParameterTool.fromArgs(args)
    var brokerList:String = null
    var jobName:String = null
    var pros = new Properties()

    checkPointConfig(env)
    if(params.has("source.kafka.bootstrap.servers")){
      pros.setProperty("bootstrap.servers",params.get("source.kafka.bootstrap.servers"))
    }
    if(params.has("source.kafka.zookeeper.connect")){
      pros.setProperty("zookeeper.connect",params.get("source.kafka.zookeeper.connect"))
    }
    if(params.has("source.group.id")){
      pros.setProperty("group.id",params.get("source.group.id"))
    }
    if(params.has("brokerList")){
      brokerList = params.get("brokerList")
    }
    if(params.has("targetTopicName")){
      targetTopicName = params.get("targetTopicName")
    }
    if(params.has("sourceTopicName")){
      sourceTopicName = params.get("sourceTopicName")
    }
    if(params.has("jobName")){
      jobName = params.get("jobName")
    }

    pros.setProperty("bootstrap.servers",params.get("source.kafka.bootstrap.servers"))
    pros.setProperty("zookeeper.connect",params.get("source.kafka.zookeeper.connect"))

    // 将出入的参数显示在 UI 上
    env.getConfig.setGlobalJobParameters(params)

    val cons = new FlinkKafkaConsumer011[String](sourceTopicName,new SimpleStringSchema(),pros)
    cons.setStartFromEarliest()

    val myProducer = new FlinkKafkaProducer011[String](
      brokerList,         // broker list
      targetTopicName,               // target topic
      new SimpleStringSchema)   // serialization schema

    val stream = env.addSource(cons).name("source-kafka: "+sourceTopicName)

    stream.map(new RichMapFunction[String,String] {
      @transient private var counter: Counter = _
      @transient private var valueToExpose = 0

      override def open(parameters: Configuration): Unit = {
        counter = getRuntimeContext()
          .getMetricGroup()
          .addGroup("MyMetric")
          .counter("myCounter")

        getRuntimeContext()
          .getMetricGroup()
          .addGroup("MyMetric")
          .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
      }

      override def map(value: String): String = {
        counter.inc()
        valueToExpose = value.length
        value
      }
    }).name("metric map")
      .addSink(myProducer).name("sink-kafka: "+targetTopicName)

    env.execute(jobName)

  }
  def checkPointConfig(env: StreamExecutionEnvironment): Unit = {
    val conf = env.getCheckpointConfig
    env.enableCheckpointing(1000)
    conf.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    conf.setMinPauseBetweenCheckpoints(500)
    conf.setCheckpointInterval(60 * 1000)
    conf.setFailOnCheckpointingErrors(false)
    conf.setMaxConcurrentCheckpoints(1)
    conf.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  }
}
